Minimal test code
df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "espresso-machine-events")\
.option("startingOffsets", "earliest")\
.load()
.option("maxOffsetsPerTrigger", 5)\
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
# Define the streaming query and output to the console
query = df.writeStream\
.outputMode("append")\
.format("console")\
.trigger(continuous="1 second")\
.trigger(once=True)\
.trigger(availableNow=True)\
.trigger(processingTime="2 seconds")\
.option("checkpointLocation", str(data_path / "tmp/checkpoints/processingTime"))\
.start()
Messages created with 1 sample/s
| Trigger Mode | Checkpoint | maxOffsetsPerTrigger | Output |
|---|---|---|---|
trigger(once=True) |
No | Ignored | Entire topic content in Batch 0 |
trigger(once=True) |
Yes | Ignored | Batch 0: Entire topic at runtime. Batch 1: created after Batch 0. |
trigger(availableNow=True) |
N/A | 5 | 5 elements per batch. Terminates with the last available offset at the time of starting the query |
trigger(processingTime="5 seconds") |
N/A | 2 | Triggers a batch every 5 seconds. Each batch contains only 2 elements. |
trigger(processingTime="5 seconds") |
N/A | 10 | Triggers a batch every 5 seconds. Each batch all new available elements, since 10 maxOffsetsPerTrigger accomodate all new messages generated since the last batch. |
trigger(continuous="1 second") |
Created automatically in /tmp if not provided | Ignored | Running continously to process each message with low latency. Decides on its own how many messages to put into each batch |
| Data Engineering Task | Python API Example | SQL Example |
|---|---|---|
| Load Data | df = spark.read.csv("data.csv", header=True) |
CREATE OR REPLACE TEMP VIEW data AS SELECT * FROM csv. OPTIONS ('header' 'true') |
| Data Cataloging | spark.catalog.listTables("my_database") |
SHOW TABLES IN my_database |
| Data Lineage Tracking | spark.sql("DESCRIBE HISTORY my_table") |
DESCRIBE HISTORY my_table |
| Streaming Data Processing | df = spark.readStream.format("kafka").option("subscribe", "topic1").load() |
CREATE STREAMING TABLE data_stream AS SELECT * FROM kafka.\topic1`` |
To run SQL
df.createOrReplaceTempView("data_view")
result = spark.sql("SELECT col1 FROM data_view")
| Use Case | Python API | SQL |
|---|---|---|
| Load Data | df = spark.read.csv("data.csv", header=True) |
CREATE OR REPLACE TEMP VIEW data AS SELECT * FROM csv.\data.csv` OPTIONS ('header' 'true')` |
| Select Columns | df.select("col1", "col2") |
SELECT col1, col2 FROM data |
| Filter Rows | df.filter(df["col1"] > 10) |
SELECT * FROM data WHERE col1 > 10 |
| Add Column | df.withColumn("new_col", df["col1"] * 2) |
SELECT *, col1 * 2 AS new_col FROM data |
| Group & Aggregate | df.groupBy("col1").agg({"col2": "sum"}) |
SELECT col1, SUM(col2) AS total FROM data GROUP BY col1 |
| Join Tables | df1.join(df2, df1["key"] == df2["key"], "inner") |
SELECT * FROM df1 INNER JOIN df2 ON df1.key = df2.key |
| Sort Data | df.orderBy(df["col1"].desc()) |
SELECT * FROM data ORDER BY col1 DESC |
| Write Data | df.write.format("parquet").save("output.parquet") |
CREATE TABLE parquet.\output.parquet` AS SELECT * FROM data` |
Approaches can be mixed via df.selectExpr()
df.selectExpr("col1", "col2 AS renamed_col").show()
raw_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topic) \
.option("startingOffsets", "earliest") \
.load()
bronze_writer_query = raw_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpointLocation ) \
.option("path", bronze_path) \
.trigger(availableNow=True) \
.start()